import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 自定义UDF
 */
public  class UDFTimestampConverter extends ScalarFunction {

    /**
     * 默认转换为北京时间
     * @param timestamp Flink Timestamp 格式时间
     * @param format 目标格式,如"YYYY-MM-dd HH:mm:ss"
     * @return 目标时区的时间
     */
    public String eval(Timestamp timestamp, String format){

        LocalDateTime noZoneDateTime = timestamp.toLocalDateTime();
        ZonedDateTime utcZoneDateTime = ZonedDateTime.of(noZoneDateTime, ZoneId.of("UTC"));

        ZonedDateTime targetZoneDateTime = utcZoneDateTime.withZoneSameInstant(ZoneId.of("+08:00"));

        return targetZoneDateTime.format(DateTimeFormatter.ofPattern(format));
    }

    /**
     * 转换为指定时区时间
     * @param timestamp Flink Timestamp 格式时间
     * @param format 目标格式,如"YYYY-MM-dd HH:mm:ss"
     * @param zoneOffset 目标时区偏移量
     * @return 目标时区的时间
     */
    public String eval(Timestamp timestamp,String format,String zoneOffset){

        LocalDateTime noZoneDateTime = timestamp.toLocalDateTime();
        ZonedDateTime utcZoneDateTime = ZonedDateTime.of(noZoneDateTime, ZoneId.of("UTC"));

        ZonedDateTime targetZoneDateTime = utcZoneDateTime.withZoneSameInstant(ZoneId.of(zoneOffset));

        return targetZoneDateTime.format(DateTimeFormatter.ofPattern(format));
    }
}